//! [`tokio`] runtime components integration for [`hyper`].
//!
//! [`hyper::rt`] exposes a set of traits to allow hyper to be agnostic to
//! its underlying asynchronous runtime. This submodule provides glue for
//! [`tokio`] users to bridge those types to [`hyper`]'s interfaces.
//!
//! # IO
//!
//! [`hyper`] abstracts over asynchronous readers and writers using [`Read`]
//! and [`Write`], while [`tokio`] abstracts over this using [`AsyncRead`]
//! and [`AsyncWrite`]. This submodule provides a collection of IO adaptors
//! to bridge these two IO ecosystems together: [`TokioIo<I>`],
//! [`WithHyperIo<I>`], and [`WithTokioIo<I>`].
//!
//! To compare and constrast these IO adaptors and to help explain which
//! is the proper choice for your needs, here is a table showing which IO
//! traits these implement, given two types `T` and `H` which implement
//! Tokio's and Hyper's corresponding IO traits:
//!
//! |                    | [`AsyncRead`]    | [`AsyncWrite`]    | [`Read`]     | [`Write`]    |
//! |--------------------|------------------|-------------------|--------------|--------------|
//! | `T`                | ✅ **true**      | ✅ **true**       | ❌ **false** | ❌ **false** |
//! | `H`                | ❌ **false**     | ❌ **false**      | ✅ **true**  | ✅ **true**  |
//! | [`TokioIo<T>`]     | ❌ **false**     | ❌ **false**      | ✅ **true**  | ✅ **true**  |
//! | [`TokioIo<H>`]     | ✅ **true**      | ✅ **true**       | ❌ **false** | ❌ **false** |
//! | [`WithHyperIo<T>`] | ✅ **true**      | ✅ **true**       | ✅ **true**  | ✅ **true**  |
//! | [`WithHyperIo<H>`] | ❌ **false**     | ❌ **false**      | ❌ **false** | ❌ **false** |
//! | [`WithTokioIo<T>`] | ❌ **false**     | ❌ **false**      | ❌ **false** | ❌ **false** |
//! | [`WithTokioIo<H>`] | ✅ **true**      | ✅ **true**       | ✅ **true**  | ✅ **true**  |
//!
//! For most situations, [`TokioIo<I>`] is the proper choice. This should be
//! constructed, wrapping some underlying [`hyper`] or [`tokio`] IO, at the
//! call-site of a function like [`hyper::client::conn::http1::handshake`].
//!
//! [`TokioIo<I>`] switches across these ecosystems, but notably does not
//! preserve the existing IO trait implementations of its underlying IO. If
//! one wishes to _extend_ IO with additional implementations,
//! [`WithHyperIo<I>`] and [`WithTokioIo<I>`] are the correct choice.
//!
//! For example, a Tokio reader/writer can be wrapped in [`WithHyperIo<I>`].
//! That will implement _both_ sets of IO traits. Conversely,
//! [`WithTokioIo<I>`] will implement both sets of IO traits given a
//! reader/writer that implements Hyper's [`Read`] and [`Write`].
//!
//! See [`tokio::io`] and ["_Asynchronous IO_"][tokio-async-docs] for more
//! information.
//!
//! [`AsyncRead`]: tokio::io::AsyncRead
//! [`AsyncWrite`]: tokio::io::AsyncWrite
//! [`Read`]: hyper::rt::Read
//! [`Write`]: hyper::rt::Write
//! [tokio-async-docs]: https://docs.rs/tokio/latest/tokio/#asynchronous-io

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

use hyper::rt::{Executor, Sleep, Timer};
use pin_project_lite::pin_project;
#[cfg(feature = "tracing")]
use tracing::instrument::Instrument;

pub use self::with_hyper_io::WithHyperIo;
pub use self::with_tokio_io::WithTokioIo;

mod with_hyper_io;
mod with_tokio_io;

/// Future executor that utilises `tokio` threads.
#[non_exhaustive]
#[derive(Default, Debug, Clone)]
pub struct TokioExecutor {}

pin_project! {
		/// A wrapper that implements Tokio's IO traits for an inner type that
		/// implements hyper's IO traits, or vice versa (implements hyper's IO
		/// traits for a type that implements Tokio's IO traits).
		#[derive(Debug)]
		pub struct TokioIo<T> {
				#[pin]
				inner: T,
		}
}

/// A Timer that uses the tokio runtime.
#[non_exhaustive]
#[derive(Default, Clone, Debug)]
pub struct TokioTimer;

// Use TokioSleep to get tokio::time::Sleep to implement Unpin.
// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html
pin_project! {
		#[derive(Debug)]
		struct TokioSleep {
				#[pin]
				inner: tokio::time::Sleep,
		}
}

// ===== impl TokioExecutor =====

impl<Fut> Executor<Fut> for TokioExecutor
where
	Fut: Future + Send + 'static,
	Fut::Output: Send + 'static,
{
	fn execute(&self, fut: Fut) {
		#[cfg(feature = "tracing")]
		tokio::spawn(fut.in_current_span());

		#[cfg(not(feature = "tracing"))]
		tokio::spawn(fut);
	}
}

impl TokioExecutor {
	/// Create new executor that relies on [`tokio::spawn`] to execute futures.
	pub fn new() -> Self {
		Self {}
	}
}

// ==== impl TokioIo =====

impl<T> TokioIo<T> {
	/// Wrap a type implementing Tokio's or hyper's IO traits.
	pub fn new(inner: T) -> Self {
		Self { inner }
	}

	/// Borrow the inner type.
	pub fn inner(&self) -> &T {
		&self.inner
	}

	/// Mut borrow the inner type.
	pub fn inner_mut(&mut self) -> &mut T {
		&mut self.inner
	}

	/// Consume this wrapper and get the inner type.
	pub fn into_inner(self) -> T {
		self.inner
	}
}

impl<T> hyper::rt::Read for TokioIo<T>
where
	T: tokio::io::AsyncRead,
{
	fn poll_read(
		self: Pin<&mut Self>,
		cx: &mut Context<'_>,
		mut buf: hyper::rt::ReadBufCursor<'_>,
	) -> Poll<Result<(), std::io::Error>> {
		let n = unsafe {
			let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
			match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) {
				Poll::Ready(Ok(())) => tbuf.filled().len(),
				other => return other,
			}
		};

		unsafe {
			buf.advance(n);
		}
		Poll::Ready(Ok(()))
	}
}

impl<T> hyper::rt::Write for TokioIo<T>
where
	T: tokio::io::AsyncWrite,
{
	fn poll_write(
		self: Pin<&mut Self>,
		cx: &mut Context<'_>,
		buf: &[u8],
	) -> Poll<Result<usize, std::io::Error>> {
		tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
	}

	fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
		tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
	}

	fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
		tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
	}

	fn is_write_vectored(&self) -> bool {
		tokio::io::AsyncWrite::is_write_vectored(&self.inner)
	}

	fn poll_write_vectored(
		self: Pin<&mut Self>,
		cx: &mut Context<'_>,
		bufs: &[std::io::IoSlice<'_>],
	) -> Poll<Result<usize, std::io::Error>> {
		tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs)
	}
}

impl<T> tokio::io::AsyncRead for TokioIo<T>
where
	T: hyper::rt::Read,
{
	fn poll_read(
		self: Pin<&mut Self>,
		cx: &mut Context<'_>,
		tbuf: &mut tokio::io::ReadBuf<'_>,
	) -> Poll<Result<(), std::io::Error>> {
		// let init = tbuf.initialized().len();
		let filled = tbuf.filled().len();
		let sub_filled = unsafe {
			let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut());

			match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) {
				Poll::Ready(Ok(())) => buf.filled().len(),
				other => return other,
			}
		};

		let n_filled = filled + sub_filled;
		// At least sub_filled bytes had to have been initialized.
		let n_init = sub_filled;
		unsafe {
			tbuf.assume_init(n_init);
			tbuf.set_filled(n_filled);
		}

		Poll::Ready(Ok(()))
	}
}

impl<T> tokio::io::AsyncWrite for TokioIo<T>
where
	T: hyper::rt::Write,
{
	fn poll_write(
		self: Pin<&mut Self>,
		cx: &mut Context<'_>,
		buf: &[u8],
	) -> Poll<Result<usize, std::io::Error>> {
		hyper::rt::Write::poll_write(self.project().inner, cx, buf)
	}

	fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
		hyper::rt::Write::poll_flush(self.project().inner, cx)
	}

	fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
		hyper::rt::Write::poll_shutdown(self.project().inner, cx)
	}

	fn is_write_vectored(&self) -> bool {
		hyper::rt::Write::is_write_vectored(&self.inner)
	}

	fn poll_write_vectored(
		self: Pin<&mut Self>,
		cx: &mut Context<'_>,
		bufs: &[std::io::IoSlice<'_>],
	) -> Poll<Result<usize, std::io::Error>> {
		hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs)
	}
}

// ==== impl TokioTimer =====

impl Timer for TokioTimer {
	fn sleep(&self, duration: Duration) -> Pin<Box<dyn Sleep>> {
		Box::pin(TokioSleep {
			inner: tokio::time::sleep(duration),
		})
	}

	fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>> {
		Box::pin(TokioSleep {
			inner: tokio::time::sleep_until(deadline.into()),
		})
	}

	fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) {
		if let Some(sleep) = sleep.as_mut().downcast_mut_pin::<TokioSleep>() {
			sleep.reset(new_deadline)
		}
	}
}

impl TokioTimer {
	/// Create a new TokioTimer
	pub fn new() -> Self {
		Self {}
	}
}

impl Future for TokioSleep {
	type Output = ();

	fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
		self.project().inner.poll(cx)
	}
}

impl Sleep for TokioSleep {}

impl TokioSleep {
	fn reset(self: Pin<&mut Self>, deadline: Instant) {
		self.project().inner.as_mut().reset(deadline.into());
	}
}

#[cfg(test)]
mod tests {
	use hyper::rt::Executor;
	use tokio::sync::oneshot;

	use crate::rt::TokioExecutor;

	#[cfg(not(miri))]
	#[tokio::test]
	async fn simple_execute() -> Result<(), Box<dyn std::error::Error>> {
		let (tx, rx) = oneshot::channel();
		let executor = TokioExecutor::new();
		executor.execute(async move {
			tx.send(()).unwrap();
		});
		rx.await.map_err(Into::into)
	}
}
